{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Install Dependencies\n",
    "\n",
    "If you are running on Google Colab, you need to install the necessary dependencies before beginning the exercise.\n",
    "\n",
    "**NOTE**: After installing the dependencies, you need to click on \"RESTART RUNTIME\""
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "print('NOTE: Intentionally crashing session to use the newly installed library.\\n')\n",
    "\n",
    "!pip uninstall -y pyarrow\n",
    "!pip install ray[debug]==0.7.5\n",
    "\n",
    "# A hack to force the runtime to restart, needed to include the above dependencies.\n",
    "import os\n",
    "os._exit(0)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "colab_type": "text",
    "id": "r-eaEv_CmqDq"
   },
   "source": [
    "# Exercise 6 - Handling Slow Tasks\n",
    "\n",
    "**GOAL:** The goal of this exercise is to show how to use `ray.wait` to avoid waiting for slow tasks.\n",
    "\n",
    "See the documentation for `ray.wait` at https://ray.readthedocs.io/en/latest/package-ref.html?highlight=ray.wait#ray.wait.\n",
    "\n",
    "### Concepts for this Exercise - ray.wait\n",
    "\n",
    "After launching a number of tasks, you may want to know which ones have finished executing. This can be done with `ray.wait`. The function works as follows.\n",
    "\n",
    "```python\n",
    "ready_ids, remaining_ids = ray.wait(object_ids, num_returns=1, timeout=None)\n",
    "```\n",
    "\n",
    "**Arguments:**\n",
    "- `object_ids`: This is a list of object IDs.\n",
    "- `num_returns`: This is maximum number of object IDs to wait for. The default value is `1`.\n",
    "- `timeout`: This is the maximum amount of time in milliseconds to wait for. So `ray.wait` will block until either `num_returns` objects are ready or until `timeout` milliseconds have passed.\n",
    "\n",
    "**Return values:**\n",
    "- `ready_ids`: This is a list of object IDs that are available in the object store.\n",
    "- `remaining_ids`: This is a list of the IDs that were in `object_ids` but are not in `ready_ids`, so the IDs in `ready_ids` and `remaining_ids` together make up all the IDs in `object_ids`."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "colab": {},
    "colab_type": "code",
    "id": "4G423eU3mqDs"
   },
   "outputs": [],
   "source": [
    "from __future__ import absolute_import\n",
    "from __future__ import division\n",
    "from __future__ import print_function\n",
    "\n",
    "import numpy as np\n",
    "import ray\n",
    "import time\n",
    "\n",
    "print('Successfully imported ray!')"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "colab": {},
    "colab_type": "code",
    "id": "XcVSx4CEmqDw"
   },
   "outputs": [],
   "source": [
    "ray.init(num_cpus=6, ignore_reinit_error=True)\n",
    "\n",
    "# Sleep a little to improve the accuracy of the timing measurements used below,\n",
    "# because some workers may still be starting up in the background.\n",
    "time.sleep(2.0)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "colab_type": "text",
    "id": "dNrYSPhFmqDy"
   },
   "source": [
    "Define a remote function that takes a variable amount of time to run."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "colab": {},
    "colab_type": "code",
    "id": "Xr17LNkqmqDz"
   },
   "outputs": [],
   "source": [
    "@ray.remote\n",
    "def f(i):\n",
    "    np.random.seed(5 + i)\n",
    "    x = np.random.uniform(0, 4)\n",
    "    time.sleep(x)\n",
    "    return i, time.time()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "colab_type": "text",
    "id": "KyQRXQM4mqD1"
   },
   "source": [
    "**EXERCISE:** Using `ray.wait`, change the code below so that `initial_results` consists of the outputs of the first three tasks to complete instead of the first three tasks that were submitted."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "colab": {},
    "colab_type": "code",
    "id": "bEacrF9zmqD2"
   },
   "outputs": [],
   "source": [
    "start_time = time.time()\n",
    "\n",
    "# This launches 6 tasks, each of which takes a random amount of time to\n",
    "# complete.\n",
    "result_ids = [f.remote(i) for i in range(6)]\n",
    "# Get one batch of tasks. Instead of waiting for a fixed subset of tasks, we\n",
    "# should instead use the first 3 tasks that finish.\n",
    "initial_results = ray.get(result_ids[:3])\n",
    "\n",
    "end_time = time.time()\n",
    "duration = end_time - start_time"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "colab_type": "text",
    "id": "65zF3catmqD4"
   },
   "source": [
    "**EXERCISE:** Change the code below so that `remaining_results` consists of the outputs of the last three tasks to complete."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "colab": {},
    "colab_type": "code",
    "id": "6vfvmSvymqEV"
   },
   "outputs": [],
   "source": [
    "# Wait for the remaining tasks to complete.\n",
    "remaining_results = ray.get(result_ids[3:])"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "colab_type": "text",
    "id": "zzVeSWQcmqEY"
   },
   "source": [
    "**VERIFY:** Run some checks to verify that the changes you made to the code were correct. Some of the checks should fail when you initially run the cells. After completing the exercises, the checks should pass."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "colab": {},
    "colab_type": "code",
    "id": "POavRmrsmqEZ"
   },
   "outputs": [],
   "source": [
    "assert len(initial_results) == 3\n",
    "assert len(remaining_results) == 3\n",
    "\n",
    "initial_indices = [result[0] for result in initial_results]\n",
    "initial_times = [result[1] for result in initial_results]\n",
    "remaining_indices = [result[0] for result in remaining_results]\n",
    "remaining_times = [result[1] for result in remaining_results]\n",
    "\n",
    "assert set(initial_indices + remaining_indices) == set(range(6))\n",
    "\n",
    "assert duration < 1.5, ('The initial batch of ten tasks was retrieved in '\n",
    "                        '{} seconds. This is too slow.'.format(duration))\n",
    "\n",
    "assert duration > 0.8, ('The initial batch of ten tasks was retrieved in '\n",
    "                        '{} seconds. This is too slow.'.format(duration))\n",
    "\n",
    "# Make sure the initial results actually completed first.\n",
    "assert max(initial_times) < min(remaining_times)\n",
    "\n",
    "print('Success! The example took {} seconds.'.format(duration))"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "colab_type": "text",
    "id": "IK6AxwYXm_MP"
   },
   "source": [
    "# Exercise 7 - Process Tasks in Order of Completion\n",
    "\n",
    "**GOAL:** The goal of this exercise is to show how to use `ray.wait` to process tasks in the order that they finish.\n",
    "\n",
    "See the documentation for ray.wait at https://ray.readthedocs.io/en/latest/package-ref.html?highlight=ray.wait#ray.wait.\n",
    "\n",
    "## Concepts for this exercise - `ray.wait`\n",
    "\n",
    "After launching a number of tasks, you may want to run the results sequentially. To do so, we build off of exercise 6 and use `ray.wait` to execute the results sequentially. \n",
    "\n",
    "We are able to use `ray.wait` because the two lists returned by **`ray.wait` maintains the ordering of the input list**. That is, if `f` is a remote function, the code \n",
    "```python\n",
    "    results = ray.wait([f.remote(i) for i in range(100)], num_returns=10)\n",
    "```\n",
    "will return `(ready_list, remain_list)` and the `ObjectID`s of in those lists will be ordered by the argument passed to `f` above."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "colab": {
     "base_uri": "https://localhost:8080/",
     "height": 351
    },
    "colab_type": "code",
    "executionInfo": {
     "elapsed": 355,
     "status": "error",
     "timestamp": 1569887890881,
     "user": {
      "displayName": "",
      "photoUrl": "",
      "userId": ""
     },
     "user_tz": 420
    },
    "id": "sXln7U76m_v7",
    "outputId": "9487f7f1-ba84-49ac-ad0f-bd8d74ce0c6a"
   },
   "outputs": [],
   "source": [
    "from __future__ import absolute_import\n",
    "from __future__ import division\n",
    "from __future__ import print_function\n",
    "\n",
    "import numpy as np\n",
    "import ray\n",
    "import time"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "colab": {
     "base_uri": "https://localhost:8080/",
     "height": 164
    },
    "colab_type": "code",
    "executionInfo": {
     "elapsed": 357,
     "status": "error",
     "timestamp": 1569887907853,
     "user": {
      "displayName": "",
      "photoUrl": "",
      "userId": ""
     },
     "user_tz": 420
    },
    "id": "Qs3jUiK0nCwj",
    "outputId": "d7d8c1fd-c919-4184-da72-1b25c38723cd"
   },
   "outputs": [],
   "source": [
    "ray.init(num_cpus=5, include_webui=False, ignore_reinit_error=True)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "colab": {
     "base_uri": "https://localhost:8080/",
     "height": 215
    },
    "colab_type": "code",
    "executionInfo": {
     "elapsed": 465,
     "status": "error",
     "timestamp": 1569887912475,
     "user": {
      "displayName": "",
      "photoUrl": "",
      "userId": ""
     },
     "user_tz": 420
    },
    "id": "idoP1vsgnG5z",
    "outputId": "98634762-92c5-4597-a997-806c9ffbacb4"
   },
   "outputs": [],
   "source": [
    "@ray.remote\n",
    "def f():\n",
    "    time.sleep(np.random.uniform(0, 5))\n",
    "    return time.time()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "colab": {
     "base_uri": "https://localhost:8080/",
     "height": 150
    },
    "colab_type": "code",
    "executionInfo": {
     "elapsed": 331,
     "status": "error",
     "timestamp": 1569887921445,
     "user": {
      "displayName": "",
      "photoUrl": "",
      "userId": ""
     },
     "user_tz": 420
    },
    "id": "QY2xr5jRnIAN",
    "outputId": "2ccc7a5c-7187-45b8-f736-4ffe58db6d78"
   },
   "source": [
    "**EXERCISE:** Change the code below to use `ray.wait` to get the results of the tasks in the order that they complete.\n",
    "\n",
    "**NOTE:** It would be a simple modification to maintain a pool of 10 experiments and to start a new experiment whenever one finishes."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "colab": {
     "base_uri": "https://localhost:8080/",
     "height": 232
    },
    "colab_type": "code",
    "executionInfo": {
     "elapsed": 502,
     "status": "error",
     "timestamp": 1569887929302,
     "user": {
      "displayName": "",
      "photoUrl": "",
      "userId": ""
     },
     "user_tz": 420
    },
    "id": "Tqwm2XJWnKOe",
    "outputId": "162f600d-28b9-4e30-88ef-3783ae86f186"
   },
   "outputs": [],
   "source": [
    "start_time = time.time()\n",
    "\n",
    "remaining_result_ids = [f.remote() for _ in range(10)]\n",
    "\n",
    "# Get the results.\n",
    "results = []\n",
    "while len(remaining_result_ids) > 0:\n",
    "    # EXERCISE: Instead of simply waiting for the first result from\n",
    "    # remaining_result_ids, use ray.wait to get the first one to finish.\n",
    "    result_id = remaining_result_ids[0]\n",
    "    remaining_result_ids = remaining_result_ids[1:]\n",
    "    result = ray.get(result_id)\n",
    "    results.append(result)\n",
    "    print('Processing result which finished after {} seconds.'\n",
    "          .format(result - start_time))    \n",
    "\n",
    "end_time = time.time()\n",
    "duration = end_time - start_time"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "colab_type": "text",
    "id": "TnkpYNBHnPTE"
   },
   "source": [
    "**VERIFY:** Run some checks to verify that the changes you made to the code were correct. Some of the checks should fail when you initially run the cells. After completing the exercises, the checks should pass."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "colab": {},
    "colab_type": "code",
    "id": "1PtResifnPmr"
   },
   "outputs": [],
   "source": [
    "assert results == sorted(results), ('The results were not processed in the '\n",
    "                                    'order that they finished.')\n",
    "\n",
    "print('Success! The example took {} seconds.'.format(duration))"
   ]
  }
 ],
 "metadata": {
  "colab": {
   "name": "Ray Tutorial - In-Order Task Processing",
   "provenance": [
    {
     "file_id": "https://github.com/ray-project/tutorial/blob/master/exercises/exercise06-Wait.ipynb",
     "timestamp": 1569887975269
    }
   ],
   "toc_visible": true
  },
  "kernelspec": {
   "display_name": "Python 3",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.6.5"
  },
  "toc": {
   "base_numbering": 1,
   "nav_menu": {},
   "number_sections": false,
   "sideBar": true,
   "skip_h1_title": false,
   "title_cell": "Table of Contents",
   "title_sidebar": "Contents",
   "toc_cell": false,
   "toc_position": {},
   "toc_section_display": true,
   "toc_window_display": true
  }
 },
 "nbformat": 4,
 "nbformat_minor": 1
}
